agentmux_srv\backend\blockcontroller/
health.rs1use std::collections::VecDeque;
12use std::sync::{Arc, Mutex};
13use std::time::{Duration, Instant};
14
15use serde::Serialize;
16
17use crate::backend::wps;
18
19#[derive(Debug, Clone, PartialEq, Eq, Serialize)]
23#[serde(rename_all = "lowercase")]
24pub enum AgentHealth {
25 Healthy,
26 Idle,
27 Degraded,
28 Stalled,
29 Dead,
30 Exited,
31}
32
33impl AgentHealth {
34 pub fn as_str(&self) -> &'static str {
35 match self {
36 Self::Healthy => "healthy",
37 Self::Idle => "idle",
38 Self::Degraded => "degraded",
39 Self::Stalled => "stalled",
40 Self::Dead => "dead",
41 Self::Exited => "exited",
42 }
43 }
44}
45
46#[derive(Debug, Clone, PartialEq, Eq)]
48pub enum ErrorClass {
49 Transient,
50 Fatal,
51}
52
53#[derive(Debug, Clone, Serialize)]
57pub struct AgentHealthEvent {
58 pub blockid: String,
59 pub health: String,
60 #[serde(skip_serializing_if = "Option::is_none")]
61 pub exit_code: Option<i32>,
62 pub detail: String,
63 #[serde(skip_serializing_if = "Option::is_none")]
64 pub last_error: Option<String>,
65}
66
67struct ErrorTracker {
71 window: VecDeque<(Instant, ErrorClass)>,
72 window_duration: Duration,
73 consecutive_transient: u32,
74}
75
76impl ErrorTracker {
77 fn new(window_duration: Duration) -> Self {
78 Self {
79 window: VecDeque::new(),
80 window_duration,
81 consecutive_transient: 0,
82 }
83 }
84
85 fn prune(&mut self) {
86 let cutoff = Instant::now() - self.window_duration;
87 while self.window.front().is_some_and(|(t, _)| *t < cutoff) {
88 self.window.pop_front();
89 }
90 }
91
92 fn record(&mut self, class: ErrorClass) {
93 self.prune();
94 match class {
95 ErrorClass::Transient => self.consecutive_transient += 1,
96 ErrorClass::Fatal => self.consecutive_transient = 0,
97 }
98 self.window.push_back((Instant::now(), class));
99 }
100
101 fn record_success(&mut self) {
102 self.consecutive_transient = 0;
103 }
104
105 fn has_fatal(&self) -> bool {
106 self.window.iter().any(|(_, c)| *c == ErrorClass::Fatal)
107 }
108
109 fn transient_count(&self) -> usize {
110 self.window.iter().filter(|(_, c)| *c == ErrorClass::Transient).count()
111 }
112
113 fn reset(&mut self) {
114 self.window.clear();
115 self.consecutive_transient = 0;
116 }
117}
118
119struct HealthMonitorInner {
123 current_health: AgentHealth,
124 active_turn: bool,
125 last_output_ts: Instant,
126 last_meaningful_ts: Instant,
127 errors: ErrorTracker,
128 exit_code: Option<i32>,
129 last_error: Option<String>,
130}
131
132pub struct HealthMonitor {
137 block_id: String,
138 inner: Mutex<HealthMonitorInner>,
139 broker: Option<Arc<wps::Broker>>,
140}
141
142impl HealthMonitor {
143 const STALL_SECS: u64 = 30;
145 const DEAD_SECS: u64 = 120;
147 const ERROR_WINDOW_SECS: u64 = 300; const DEGRADED_TRANSIENT_THRESHOLD: usize = 5;
151
152 pub fn new(block_id: String, broker: Option<Arc<wps::Broker>>) -> Self {
153 let now = Instant::now();
154 Self {
155 block_id,
156 inner: Mutex::new(HealthMonitorInner {
157 current_health: AgentHealth::Idle,
158 active_turn: false,
159 last_output_ts: now,
160 last_meaningful_ts: now,
161 errors: ErrorTracker::new(Duration::from_secs(Self::ERROR_WINDOW_SECS)),
162 exit_code: None,
163 last_error: None,
164 }),
165 broker,
166 }
167 }
168
169 pub fn set_active_turn(&self, active: bool) {
171 let mut inner = self.inner.lock().unwrap();
172 inner.active_turn = active;
173 let now = Instant::now();
174 inner.last_output_ts = now;
175 inner.last_meaningful_ts = now;
176 if active {
177 inner.errors.reset();
178 inner.exit_code = None;
179 }
180 drop(inner);
181 self.evaluate_and_transition();
182 }
183
184 pub fn set_exited(&self, exit_code: i32) {
186 let mut inner = self.inner.lock().unwrap();
187 inner.active_turn = false;
188 inner.exit_code = Some(exit_code);
189 drop(inner);
190 self.evaluate_and_transition();
191 }
192
193 pub fn record_output(&self, meaningful: bool) {
196 let mut inner = self.inner.lock().unwrap();
197 let now = Instant::now();
198 inner.last_output_ts = now;
199 if meaningful {
200 inner.last_meaningful_ts = now;
201 inner.errors.record_success();
202 }
203 drop(inner);
204 let health = self.inner.lock().unwrap().current_health.clone();
207 if health == AgentHealth::Stalled || health == AgentHealth::Dead {
208 self.evaluate_and_transition();
209 }
210 }
211
212 pub fn record_error(&self, class: ErrorClass, message: String) {
214 let mut inner = self.inner.lock().unwrap();
215 inner.errors.record(class);
216 inner.last_error = Some(message);
217 drop(inner);
218 self.evaluate_and_transition();
219 }
220
221 pub fn is_active_turn(&self) -> bool {
223 self.inner.lock().unwrap().active_turn
224 }
225
226 pub fn check(&self) {
228 self.evaluate_and_transition();
229 }
230
231 fn evaluate_and_transition(&self) {
233 let mut inner = self.inner.lock().unwrap();
234 let new_health = Self::compute_health(&inner);
235
236 if new_health != inner.current_health {
237 let old = inner.current_health.clone();
238 inner.current_health = new_health.clone();
239 let detail = Self::make_detail(&inner, &new_health);
240 let event = AgentHealthEvent {
241 blockid: self.block_id.clone(),
242 health: new_health.as_str().to_string(),
243 exit_code: inner.exit_code,
244 detail,
245 last_error: inner.last_error.clone(),
246 };
247 drop(inner);
248
249 tracing::info!(
250 block_id = %self.block_id,
251 old = ?old,
252 new = ?new_health,
253 "agent health transition"
254 );
255 self.publish_health(event);
256 }
257 }
258
259 fn compute_health(inner: &HealthMonitorInner) -> AgentHealth {
261 if let Some(code) = inner.exit_code {
263 if code == 0 {
264 return AgentHealth::Idle; }
266 return AgentHealth::Exited;
267 }
268
269 if inner.errors.has_fatal() {
271 return AgentHealth::Dead;
272 }
273
274 if !inner.active_turn {
276 return AgentHealth::Idle;
277 }
278
279 let silence = inner.last_meaningful_ts.elapsed();
281 if silence > Duration::from_secs(Self::DEAD_SECS) {
282 return AgentHealth::Dead;
283 }
284 if silence > Duration::from_secs(Self::STALL_SECS) {
285 return AgentHealth::Stalled;
286 }
287
288 if inner.errors.transient_count() >= Self::DEGRADED_TRANSIENT_THRESHOLD {
290 return AgentHealth::Degraded;
291 }
292
293 AgentHealth::Healthy
294 }
295
296 fn make_detail(inner: &HealthMonitorInner, health: &AgentHealth) -> String {
298 match health {
299 AgentHealth::Healthy => "Agent is responding normally".to_string(),
300 AgentHealth::Idle => "Waiting for next message".to_string(),
301 AgentHealth::Degraded => {
302 format!(
303 "{} transient errors in the last 5 minutes",
304 inner.errors.transient_count()
305 )
306 }
307 AgentHealth::Stalled => {
308 let secs = inner.last_meaningful_ts.elapsed().as_secs();
309 format!("No output for {}s", secs)
310 }
311 AgentHealth::Dead => {
312 if inner.errors.has_fatal() {
313 inner
314 .last_error
315 .clone()
316 .unwrap_or_else(|| "Fatal error detected".to_string())
317 } else {
318 let secs = inner.last_meaningful_ts.elapsed().as_secs();
319 format!("Unresponsive for {}s", secs)
320 }
321 }
322 AgentHealth::Exited => {
323 format!("Exited with code {}", inner.exit_code.unwrap_or(-1))
324 }
325 }
326 }
327
328 fn publish_health(&self, event: AgentHealthEvent) {
330 if let Some(ref broker) = self.broker {
331 let wps_event = wps::WaveEvent {
332 event: wps::EVENT_AGENT_HEALTH.to_string(),
333 scopes: vec![format!("block:{}", self.block_id)],
334 sender: String::new(),
335 persist: 0,
336 data: serde_json::to_value(&event).ok(),
337 };
338 broker.publish(wps_event);
339 }
340 }
341}
342
343pub fn classify_output_line(
348 parsed: &serde_json::Value,
349) -> (bool, Option<(ErrorClass, String)>) {
350 let event_type = parsed.get("type").and_then(|v| v.as_str()).unwrap_or("");
351
352 match event_type {
353 "rate_limit_event" => {
354 (false, Some((ErrorClass::Transient, "Rate limited".to_string())))
355 }
356 "result" => {
357 let is_error = parsed
358 .get("is_error")
359 .and_then(|v| v.as_bool())
360 .unwrap_or(false);
361 if !is_error {
362 return (true, None);
363 }
364 let msg = parsed
365 .get("error")
366 .or_else(|| parsed.get("error_message"))
367 .and_then(|v| v.as_str())
368 .unwrap_or("")
369 .to_lowercase();
370
371 let class = if msg.contains("unauthorized")
372 || msg.contains("401")
373 || msg.contains("forbidden")
374 || msg.contains("403")
375 || msg.contains("token expired")
376 || msg.contains("authentication")
377 {
378 ErrorClass::Fatal
379 } else if msg.contains("overloaded")
380 || msg.contains("503")
381 || msg.contains("500")
382 || msg.contains("rate")
383 || msg.contains("capacity")
384 {
385 ErrorClass::Transient
386 } else {
387 ErrorClass::Fatal
389 };
390
391 (true, Some((class, msg)))
392 }
393 "stream_event" => {
395 if let Some(inner) = parsed.get("event") {
396 return classify_output_line(inner);
397 }
398 (true, None)
399 }
400 _ => (true, None),
401 }
402}
403
404#[cfg(test)]
405mod tests {
406 use super::*;
407
408 #[test]
409 fn test_error_tracker_basic() {
410 let mut tracker = ErrorTracker::new(Duration::from_secs(300));
411 assert!(!tracker.has_fatal());
412 assert_eq!(tracker.transient_count(), 0);
413
414 tracker.record(ErrorClass::Transient);
415 assert_eq!(tracker.transient_count(), 1);
416 assert!(!tracker.has_fatal());
417
418 tracker.record(ErrorClass::Fatal);
419 assert!(tracker.has_fatal());
420 }
421
422 #[test]
423 fn test_classify_rate_limit() {
424 let event: serde_json::Value =
425 serde_json::from_str(r#"{"type":"rate_limit_event"}"#).unwrap();
426 let (meaningful, error) = classify_output_line(&event);
427 assert!(!meaningful);
428 assert!(matches!(error, Some((ErrorClass::Transient, _))));
429 }
430
431 #[test]
432 fn test_classify_auth_error() {
433 let event: serde_json::Value = serde_json::from_str(
434 r#"{"type":"result","is_error":true,"error":"Unauthorized: token expired"}"#,
435 )
436 .unwrap();
437 let (_, error) = classify_output_line(&event);
438 assert!(matches!(error, Some((ErrorClass::Fatal, _))));
439 }
440
441 #[test]
442 fn test_classify_overloaded() {
443 let event: serde_json::Value = serde_json::from_str(
444 r#"{"type":"result","is_error":true,"error":"Service overloaded, try again"}"#,
445 )
446 .unwrap();
447 let (_, error) = classify_output_line(&event);
448 assert!(matches!(error, Some((ErrorClass::Transient, _))));
449 }
450
451 #[test]
452 fn test_classify_normal_result() {
453 let event: serde_json::Value = serde_json::from_str(
454 r#"{"type":"result","is_error":false,"total_cost_usd":0.05}"#,
455 )
456 .unwrap();
457 let (meaningful, error) = classify_output_line(&event);
458 assert!(meaningful);
459 assert!(error.is_none());
460 }
461
462 #[test]
463 fn test_health_monitor_lifecycle() {
464 let monitor = HealthMonitor::new("test-block".to_string(), None);
465
466 {
468 let inner = monitor.inner.lock().unwrap();
469 assert_eq!(inner.current_health, AgentHealth::Idle);
470 }
471
472 monitor.set_active_turn(true);
474 {
475 let inner = monitor.inner.lock().unwrap();
476 assert_eq!(inner.current_health, AgentHealth::Healthy);
477 }
478
479 monitor.record_output(true);
481 {
482 let inner = monitor.inner.lock().unwrap();
483 assert_eq!(inner.current_health, AgentHealth::Healthy);
484 }
485
486 monitor.set_exited(0);
488 {
489 let inner = monitor.inner.lock().unwrap();
490 assert_eq!(inner.current_health, AgentHealth::Idle);
491 }
492 }
493
494 #[test]
495 fn test_health_monitor_fatal_error() {
496 let monitor = HealthMonitor::new("test-block".to_string(), None);
497 monitor.set_active_turn(true);
498
499 monitor.record_error(ErrorClass::Fatal, "Unauthorized".to_string());
500 {
501 let inner = monitor.inner.lock().unwrap();
502 assert_eq!(inner.current_health, AgentHealth::Dead);
503 }
504 }
505}